package net.quanter.shield.mq.rocketmq.consumer;

import com.aliyun.openservices.ons.api.*;
import lombok.extern.slf4j.Slf4j;
import net.quanter.shield.mq.MQConsumer;
import net.quanter.shield.mq.MQMessageVO;
import net.quanter.shield.mq.RunType;
import net.quanter.shield.mq.rocketmq.param.RocketMQBorkerParam;
import net.quanter.shield.mq.rocketmq.utils.ObjectUtils;

import java.lang.reflect.Type;
import java.util.*;

@Slf4j
public class RocketMQTcpConsumer implements MQConsumer {

    final RocketMQBorkerParam mqConnectVO;
    private final String groupId;
    private final Consumer consumer;
    private final RocketMQConsumerParam[] mqConsumerConnectVOS;

    public RocketMQTcpConsumer(
            RocketMQBorkerParam mqConnectVO,
            String groupId,
            RocketMQConsumerParam... mqConsumerConnectVOS
    ) {
        this.mqConnectVO = mqConnectVO;
        this.groupId = groupId;
        this.mqConsumerConnectVOS = mqConsumerConnectVOS;
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, groupId);
        properties.put(PropertyKeyConst.AccessKey, mqConnectVO.getAccessId());
        properties.put(PropertyKeyConst.SecretKey, mqConnectVO.getAccessKey());
        properties.put(PropertyKeyConst.NAMESRV_ADDR, mqConnectVO.getEndPoint());
        properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        consumer = ONSFactory.createConsumer(properties);

        //topic+tag 对应的 Listener
        Map<String, RocketMQPushMessageListener> topicTagToListenerMap = new HashMap<>();
        for (RocketMQConsumerParam mqConsumerConnectVO : mqConsumerConnectVOS) {
            topicTagToListenerMap.put(mqConsumerConnectVO.getTopicAndTagString(), mqConsumerConnectVO.getListener());
        }

        //合并topic相同的 RocketMQConsumerParam ,并将tagString 合并
        Map<String,RocketMQConsumerParam> topicToConsumerParamMap = new HashMap<>();
        for (RocketMQConsumerParam mqConsumerConnectVO : mqConsumerConnectVOS) {
            String topicName = mqConsumerConnectVO.getTopic().getName();
            if (topicToConsumerParamMap.get(topicName) == null){
                topicToConsumerParamMap.put(topicName,mqConsumerConnectVO);
            }else {
                String tagString =  topicToConsumerParamMap.get(topicName).getTagString() +"||"+ mqConsumerConnectVO.getTagString();
                mqConsumerConnectVO.setTagString(tagString);
                topicToConsumerParamMap.put(topicName,mqConsumerConnectVO);
            }
        }
        //将merge后的 RocketMQConsumerParamMap,转为list
        List<RocketMQConsumerParam> rocketMQConsumerParamList = new ArrayList<>(topicToConsumerParamMap.values());

        for (RocketMQConsumerParam rocketMQConsumerParam : rocketMQConsumerParamList) {
            String topicName = rocketMQConsumerParam.getTopic().getName();
            String tags = rocketMQConsumerParam.getTagString();
            consumer.subscribe(
                    topicName,
                    tags
                    , new MessageListener() { //订阅多个Tag。
                        @Override
                        public Action consume(Message message, ConsumeContext context) {
                            try {
                                String tag = message.getTag();//todo 如果是*会有bug  此处拿到的 message.getTag 肯定不是*, 所以从ListenerMap中拿,可能拿到空

                                String topicName = message.getTopic();
                                String topicNameAndTag = topicName+"_"+tag;
                                //从map中获取当前topic+tag对应的 Listener
                                RocketMQPushMessageListener listener = topicTagToListenerMap.get(topicNameAndTag);
                                MQMessageVO MQMessageVO = fromRocketTcpMessage(
                                        listener.getMessageType(), message);
                                boolean b = listener.process(MQMessageVO);
                                return b ? Action.CommitMessage : Action.ReconsumeLater;
                            } catch (Throwable e) {
                                return Action.ReconsumeLater;
                            }
                        }
                    });
        }
    }



    @Override
    public void start(RunType runType) {
        log.info("RocketMQTcpConsumer starting...");
        consumer.start();
        log.info("RocketMQTcpConsumer has started!");
    }

    @Override
    public void stop() {
        log.info("RocketMQTcpConsumer stoping...");
        consumer.shutdown();
        log.info("RocketMQTcpConsumer has stoped!");
    }


    public static <T> MQMessageVO<T> fromRocketTcpMessage(Type type, Message message) {
        MQMessageVO MQMessageVO = new MQMessageVO();

        MQMessageVO.setTag(message.getTag());
        MQMessageVO.setShardKey(message.getShardingKey());
        if (message.getUserProperties() != null) {
            MQMessageVO.putAll(message.getUserProperties());
        }
        MQMessageVO.setMessageId(message.getMsgID());
        MQMessageVO.setConsumedTimes(message.getReconsumeTimes());
        MQMessageVO.setPublishTime(message.getBornTimestamp());
        MQMessageVO.setFirstConsumeTime(message.getStartDeliverTime());
        MQMessageVO.setOffset(message.getOffset());
        MQMessageVO.setMessageId(message.getMsgID());
        byte[] bytes = message.getBody();
        Object body = ObjectUtils.byteArrayToObject(type, bytes);
        MQMessageVO.setObj(body);
        return MQMessageVO;
    }
}
